跳到主要内容

SpringBoot 整合 RocketMQ

快速回顾

RocketMQ的核心组件概念如下:

Producer:生产发送消息 Broker:存储 Producer 发送过来的消息 Consumer:从 Broker 拉取消息并进行消费 NameServer:为 Producer 或 Consumer 路由到 Broker

其中消费流程有以下几点是必须注意的:

1、RocketMQ 的 Consumer 获取消息是通过向 Broker 发送拉取请求获取的,而不是由 Broker 发送 Consumer 接收的方式。 2、Consumer 每次拉取消息时消息都会被均匀分发到消息队列再进行传输,所以 RocketMQ 中的很多参数都是针对队列而不是 Topic 的,其中每个 Broker 消息队列(ConsumeQueue)的数量都可以通过 RocketMQ DashBoard 实时更改调整。

使用 Docker 配置环境

搭建 Rocket MQ 配置,这里使用 Docker

创建一个 docker-compose.yml 文件

version: '3'
services:
namesrv:
image: apacherocketmq/rocketmq
container_name: namesrv
ports:
- 19876:9876
volumes:
- /etc/localtime:/etc/localtime
- ./data/namesrv/logs:/home/rocketmq/logs
command: sh mqnamesrv
environment:
JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m"
broker:
image: apacherocketmq/rocketmq
container_name: rmqbroker
ports:
- 10909:10909
- 10911:10911
- 10912:10912
volumes:
- /etc/localtime:/etc/localtime
- ./data/broker/logs:/home/rocketmq/logs
# - ./data/broker/store:/home/rocketmq/store
# - ./data/broker/broker.conf:/home/rocketmq/rocketmq-4.6.0/conf/broker.conf
command: sh mqbroker -n namesrv:9876 -c ../conf/broker.conf autoCreateTopicEnable=true
environment:
JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m"
depends_on:
- namesrv
rmqconsole:
image: apacherocketmq/rocketmq-console:2.0.0
container_name: rmqconsole
volumes:
- /etc/localtime:/etc/localtime
ports:
- 38080:8080
environment:
JAVA_OPTS: -Drocketmq.namesrv.addr=namesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false -Duser.timezone='Asia/Shanghai'
depends_on:
- namesrv

这里加上这个设置时区是因为各个服务的时间不同步会导致 RocketMQ 错误(RocketMQ 控制台显示 This date have't data)

- /etc/localtime:/etc/localtime

注意:如果本地测试(WSL)就不要使用 Docker,应该使用 VM,然后进去修改配置文件

docker exec -it rmqbroker /bin/bash
bash
vi ../conf/broker.conf

加上

brokerIP1 = 虚拟机地址

构建一个 RocketMQ 的项目

注意 SpringBoot 的版本是 2.3

主要是导入 rocketmq-spring-boot-starter 的包,以及 spring-boot-starter-web 的包;

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.1</version>
</dependency>

编写配置文件

rocketmq:
name-server: localhost:9876
producer:
group: test-producer-group

简单使用例

点赞接口

PraiseRecord(点赞记录)

@Data
public class PraiseRecord implements Serializable {
private Long id;
private Long uid;
private Long liveId;
private LocalDateTime createTime;
}

注意要继承序列化接口

简单写个 API(生产者)

MessageController 简单的测试接口

@RestController
@RequestMapping("/message")
public class MessageController {
@Resource
private RocketMQTemplate rocketMQTemplate;

@PostMapping("/praise")
public ServerResponse praise(@RequestBody PraiseRecordVO vo) {
rocketMQTemplate.sendOneWay(RocketConstant.Topic.PRAISE_TOPIC, MessageBuilder.withPayload(vo).build());
return ServerResponse.success();
}

// ......

}

由于用户可以连续点赞,所以考虑可以在点赞消息的处理上宽松一点(容许消息丢失)以追求更高的性能,因此选择使用 sendOneWay() 进行消息发送。

点赞消息消费者

@Service
@RocketMQMessageListener(topic = RocketConstant.Topic.PRAISE_TOPIC, consumerGroup = RocketConstant.ConsumerGroup.PRAISE_CONSUMER)
@Slf4j
public class PraiseListener implements RocketMQListener<PraiseRecordVO>, RocketMQPushConsumerLifecycleListener {
@Resource
private PraiseRecordService praiseRecordService;

@Override
public void onMessage(PraiseRecordVO vo) {
praiseRecordService.insert(vo.copyProperties(PraiseRecord::new));
}

@Override
public void prepareStart(DefaultMQPushConsumer consumer) {
// 每次拉取的间隔,单位为毫秒
consumer.setPullInterval(2000);
// 设置每次从队列中拉取的消息数为16
consumer.setPullBatchSize(16);
}
}

单次 pull 消息的最大数目受 broker 存储的 MessageStoreConfig.maxTransferCountOnMessageInMemory(默认为32) 值限制,即若想要消费者从队列拉取的消息数大于 32 有效(pullBatchSize > 32)则需更改 Broker 的启动参数 maxTransferCountOnMessageInMemory 值。在 MQ 削峰的配置参数里,以下几个 DefaultMQPushConsumer 的参数是需要注意一下的:

pullInterval:每次从 Broker 拉取消息的间隔,单位为毫秒

pullBatchSize:每次从 Broker 队列拉取到的消息数,该参数很容易让人误解,一开始我以为是每次拉取的消息总数,但测试过几次后确认了实质上是从每个队列的拉取数,即 Consume 每次拉取的消息总数如下:EachPullTotal = 所有Broker上的写队列数和(writeQueueNums=readQueueNums) * pullBatchSize

consumeMessageBatchMaxSize:每次消费(即将多条消息合并为 List 消费)的最大消息数目,默认值为 1

发送消息的三种方式

从功能上来说,rocketmq支持三种发送消息的方式,分别是同步发送(sync),异步发送(async)和直接发送(oneway)。

在实际使用场景中,利用何种发送方式,可以总结如下:

  • 当发送的消息不重要时,采用 one-way 方式,以提高吞吐量;
  • 当发送的消息很重要是,且对响应时间不敏感的时候采用 sync 方式;
  • 当发送的消息很重要,且对响应时间非常敏感的时候采用 async 方式;

同步发送 sync

发送消息采用同步模式,这种方式只有在消息完全发送完成之后才返回结果,此方式存在需要同步等待发送结果的时间代价。

这种方式具有内部重试机制,即在主动声明本次消息发送失败之前,内部实现将重试一定次数,默认为2次(DefaultMQProducer#getRetryTimesWhenSendFailed)。 发送的结果存在同一个消息可能被多次发送给给 broker,这里需要应用的开发者自己在消费端处理幂等性问题。

案例代码如下:

public void sync() {
rocketMQTemplate.syncSend("topic-name", "send sync message !");
}

异步发送 async

发送消息采用异步发送模式,消息发送后立刻返回,当消息完全完成发送后,会调用回调函数 sendCallback 来告知发送者本次发送是成功或者失败。异步模式通常用于响应时间敏感业务场景,即承受不了同步发送消息时等待返回的耗时代价。

同同步发送一样,异步模式也在内部实现了重试机制,默认次数为2次(DefaultMQProducer#getRetryTimesWhenSendAsyncFailed)。发送的结果同样存在同一个消息可能被多次发送给 broker,需要应用的开发者自己在消费端处理幂等性问题。

public void async() {
rocketMQTemplate.asyncSend("topic-name", "send async message!", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("send successful");
}

@Override
public void onException(Throwable throwable) {
log.info("send fail; {}", throwable.getMessage());
}
});
}

直接发送 one-way

采用 one-way 发送模式发送消息的时候,发送端发送完消息后会立即返回,不会等待来自broker的ack来告知本次消息发送是否完全完成发送。这种方式吞吐量很大,但是存在消息丢失的风险,所以其适用于不重要的消息发送,比如日志收集。

案例代码如下:

public void oneWay() {
rocketMQTemplate.sendOneWay("topic-name", "send one-way message");
}

异常处理

显示 closeChannel: close the connection to remote address[] result: true

解决参考:docker安装rocketmq遇到的坑(外部无法连接)

修改了 IP 后 Rocket-Console 还报错则不用管它,已经能用了

Reference